/*******************************************************************************
 * Package: com.song.fafka
 * Type:    KafkaReceiver
 * Date:    2023-12-28 22:18
 *
 * Copyright (c) 2023 LTD All Rights Reserved.
 *
 * You may not use this file except in compliance with the License.
 *******************************************************************************/
package com.song.fafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * 功能描述：kafka 消息消费者
 *
 * @author Songxianyang
 * @date 2023-12-28 22:18
 */
@Component
@Slf4j
public class KafkaReceiver {
    // test_consumer_group 这个组会往zookeeper里面注册
    @KafkaListener(groupId = "test_consumer_group",topics = {"test1"})
    public void listen(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            // 手动消息签收
            acknowledgment.acknowledge();

            log.info(">>>>>>>>>>>>>>>>>>>>>> 成功消费的消息为:{}<<<<<<<<<<<<<<<<<<<<<<<" , message);

        }

    }
}
